package com.base;


import canal.bean.RowData;
import com.utils.CanalRowDataDeserialzerSchema;
import com.utils.KafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class MySqlBaseETL implements BaseETL<RowData>{

    @Override
    public DataStream<RowData> KafkaConsummer(String topic,StreamExecutionEnvironment env ) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
//        env.setParallelism(1);

        //配置flink链接kafka
        FlinkKafkaConsumer<RowData> consumer = new FlinkKafkaConsumer<>(
                topic,
                //反序列化
                new CanalRowDataDeserialzerSchema(),
                KafkaUtil.kafkaProps);

        consumer.setStartFromEarliest();

        DataStream<RowData> source = env.addSource(consumer);
        return source;
    }



}
